# Provide input/output streams for transforming Monasca logs.
# Filters should be provided in other configuration files.

input {
    kafka {
        zk_connect => "{{ monasca_zookeeper_servers }}"
        topic_id => "{{ monasca_raw_logs_topic }}"
        group_id => "log_transformer"
        consumer_id => "log_transformer_{{ ansible_hostname }}"
        consumer_threads => "{{ monasca_log_pipeline_threads }}"
    }
}

filter {
    # Update the timestamp of the event based on the time in the message.
    date {
        match => [ "[log][dimensions][timestamp]", "yyyy-MM-dd HH:mm:ss Z", "ISO8601"]
        remove_field => [ "[log][dimensions][timestamp]", "[log][dimensions][Timestamp]" ]
    }

    # Monasca Log API adds a timestamp when it processes a log entry. This
    # timestamp needs to be converted from seconds since the epoch for
    # Elasticsearch to parse it correctly. Here we make that conversion.
    date {
        match => ["creation_time", "UNIX"]
        target => "creation_time"
    }

    # OpenStack log levels are uppercase, and syslog are lowercase.
    # Furthermore, syslog has more log levels that OpenStack. To avoid
    # mapping syslog log levels to OpenStack log levels, we standardise
    # on the syslog style here.
    if [log][dimensions][log_level] {
        mutate {
            lowercase => [ "[log][dimensions][log_level]" ]
        }
    }
}

output {
    kafka {
        bootstrap_servers => "{{ monasca_kafka_servers }}"
        topic_id => "{{ monasca_transformed_logs_topic }}"
        client_id => "log_transformer_{{ ansible_hostname }}"
        workers => {{ monasca_log_pipeline_threads|int }}
    }
}
